/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.lvyh.lightframe.core.invoke.response;

import com.alibaba.fastjson.JSON;
import com.lvyh.lightframe.core.invoke.RpcInvokeContext;
import com.lvyh.lightframe.core.invoke.request.RpcRequest;
import com.lvyh.lightframe.common.util.DateUtils;
import com.lvyh.lightframe.common.exception.RpcRuntimeException;
import com.lvyh.lightframe.core.util.SpringContextUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Date;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Asynchronous to synchronous
 * <p>
 * Using conditional lock to implement protective pause mode
 */
public class RpcResponseFuture implements Future<RpcResponse> {
    private Logger logger = LoggerFactory.getLogger(RpcResponseFuture.class);
    private RpcRequest request;
    private RpcResponse response;
    private RpcInvokeContext rpcInvokeContext = SpringContextUtils.getBean(RpcInvokeContext.class);
    private ResponseCallback responseCallback;

    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public RpcResponseFuture(RpcRequest request) {
        this.request = request;
        this.rpcInvokeContext.setResponseFuture(request.getRequestId(), this);
    }

    public RpcResponseFuture(RpcRequest request, ResponseCallback responseCallback) {
        this.request = request;
        this.responseCallback = responseCallback;
        this.rpcInvokeContext.setResponseFuture(request.getRequestId(), this);

    }

    public void removeResponseFuture() {
        this.rpcInvokeContext.removeResponseFuture(request.getRequestId());
    }

    public RpcRequest getRequest() {
        return request;
    }

    public RpcResponse getResponse() {
        return response;
    }

    public ResponseCallback getResponseCallback() {
        return responseCallback;
    }

    public void setResponse(RpcResponse response) {
        this.response = response;
        try {
            lock.lock();
            condition.signal();
        } finally {
            lock.unlock();
        }
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        return false;
    }

    @Override
    public boolean isCancelled() {
        return false;
    }

    @Override
    public boolean isDone() {
        return response != null;
    }

    @Override
    public RpcResponse get() throws InterruptedException, ExecutionException {
        try {
            return get(-1, TimeUnit.MILLISECONDS);
        } catch (TimeoutException e) {
            throw new RpcRuntimeException(e);
        }
    }

    @Override
    public RpcResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {

        if (TimeUnit.MILLISECONDS != unit) {
            timeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
        }

        logger.info("[RpcResponseFuture] consumer get response future,isDone：{},timeout:{}, response:{},currTime:{}", isDone(), timeout, JSON.toJSONString(response), DateUtils.dateToMillisStr(new Date()));
        if (!isDone()) {
            try {
                lock.lock();
                if (timeout < 0) {
                    condition.await();
                } else {
                    condition.await(timeout, TimeUnit.MILLISECONDS);
                }
            } catch (InterruptedException e) {
                logger.error("[RpcResponseFuture] consumer get response InterruptedException,isDone：{}, response:{}", isDone(), JSON.toJSONString(response));
                throw e;
            } finally {
                lock.unlock();
            }
        }

        logger.info("[RpcResponseFuture] consumer get response future wait timeout,isDone：{},timeout:{}, response:{},currTime:{}", isDone(), timeout, JSON.toJSONString(response), DateUtils.dateToMillisStr(new Date()));
        if (!isDone()) {
            throw new RpcRuntimeException("[RpcResponseFuture] rpc request timeout at:" + System.currentTimeMillis() + ", request:" + request.toString());
        }
        return response;
    }


}
